﻿#include <ctime>
#include <sstream>
#ifdef WIN32
#include <hiredis.h>
#include <winsock.h>
#else
#include <hiredis/hiredis.h>
#include <sys/time.h>
#endif
#include "../../src/repo/src/include/root/coroutine/coroutine.h"
#include "../box/service_box.hh"
#include "../detail/lang_impl.hh"
#include "../util/object_pool.hh"
#include "../util/os_util.hh"
#include "../util/string_util.hh"
#include "../util/time_util.hh"
#include "redis_impl.hh"

kratos::redis::RedisImpl::RedisImpl(service::ServiceBox *box) { box_ = box; }

kratos::redis::RedisImpl::~RedisImpl() { stop(); }

auto kratos::redis::RedisImpl::start() -> bool { return true; }

auto kratos::redis::RedisImpl::stop() -> bool {
  for (auto &it : worker_map_) {
    for (const auto &worker : it.second) {
      worker->stop();
    }
  }
  worker_map_.clear();
  return true;
}

auto kratos::redis::RedisImpl::update(std::time_t ms) -> std::size_t {
  std::size_t count = 0;
  for (auto &it : worker_map_) {
    for (const auto &worker : it.second) {
      count += worker->update(ms);
    }
  }
  return count;
}

auto kratos::redis::RedisImpl::add_host(const std::string &name,
                                        const std::string &host, int port,
                                        const std::string &user,
                                        const std::string &passwd) -> bool {
  if (name.empty() || host.empty() || ((0 >= port) || (port >= 65535))) {
    if (box_) {
      std::stringstream error;
      error << "name[" << name << "], host[" << host << "], port[" << port
            << "]";
      box_->write_log(lang::LangID::LANG_REDIS_INVALID_ARGUMENT,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return false;
  }
  worker_map_[name].emplace_back(
      std::move(kratos::make_unique_pool_ptr<RedisWorker>(box_)));
  return worker_map_[name].back()->start(host, port, user, passwd);
}

auto kratos::redis::RedisImpl::do_command(const std::string &name,
                                          const std::string &command,
                                          std::time_t timeout,
                                          RedisHandler handler,
                                          std::uint64_t user_data) -> bool {
  if (name.empty() || command.empty()) {
    if (box_) {
      std::stringstream error;
      error << "name[" << name << "], command[" << command << "]";
      box_->write_log(lang::LangID::LANG_REDIS_INVALID_ARGUMENT,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return false;
  }
  if (!timeout) {
    timeout = 1000;
  }
  auto it = worker_map_.find(name);
  if (it == worker_map_.end()) {
    // 未找到主机
    return false;
  }
  if (it->second.empty()) {
    // 未找到主机
    return false;
  }
  auto size = it->second.size();
  // TODO 轮询
  auto index = util::get_random_uint32(0, static_cast<std::uint32_t>(size - 1));
  return it->second.at(index)->do_command(command, timeout, handler, user_data);
}

auto kratos::redis::RedisImpl::do_command(const std::string &name,
                                          const CommandVector &cmd_vec,
                                          std::time_t timeout,
                                          RedisHandler handler,
                                          std::uint64_t user_data) -> bool {
  if (name.empty() || cmd_vec.empty()) {
    if (box_) {
      std::stringstream error;
      error << "name[" << name << "], command[batch command...]";
      box_->write_log(lang::LangID::LANG_REDIS_INVALID_ARGUMENT,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return false;
  }
  if (!timeout) {
    timeout = 1000;
  }
  auto it = worker_map_.find(name);
  if (it == worker_map_.end()) {
    // 未找到主机
    return false;
  }
  if (it->second.empty()) {
    // 未找到主机
    return false;
  }
  auto size = it->second.size();
  // TODO 轮询
  auto index = util::get_random_uint32(0, static_cast<std::uint32_t>(size - 1));
  return it->second.at(index)->do_command(cmd_vec, timeout, handler, user_data);
}

auto kratos::redis::RedisImpl::do_command_co(const std::string &name,
                                             const std::string &command,
                                             std::time_t timeout)
    -> std::unique_ptr<Result> {
  if (name.empty() || command.empty()) {
    if (box_) {
      std::stringstream error;
      error << "name[" << name << "], command[" << command << "]";
      box_->write_log(lang::LangID::LANG_REDIS_INVALID_ARGUMENT,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return nullptr;
  }
  if (!timeout) {
    timeout = 1000;
  }
  if (coro_is_main()) {
    // 不能在主协程内执行
    return nullptr;
  }
  auto it = worker_map_.find(name);
  if (it == worker_map_.end()) {
    // 未找到主机
    return nullptr;
  }
  if (it->second.empty()) {
    // 未找到主机
    return nullptr;
  }
  auto size = it->second.size();
  // TODO 轮询
  auto index = util::get_random_uint32(0, static_cast<std::uint32_t>(size - 1));
  return it->second.at(index)->do_command_co(command, timeout);
}

auto kratos::redis::RedisImpl::do_command_co(const std::string &name,
                                             const CommandVector &cmd_vec,
                                             std::time_t timeout)
    -> std::unique_ptr<Result> {
  if (name.empty() || cmd_vec.empty()) {
    if (box_) {
      std::stringstream error;
      error << "name[" << name << "], command[batch command...]";
      box_->write_log(lang::LangID::LANG_REDIS_INVALID_ARGUMENT,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return nullptr;
  }
  if (!timeout) {
    timeout = 1000;
  }
  if (coro_is_main()) {
    // 不能在主协程内执行
    return nullptr;
  }
  auto it = worker_map_.find(name);
  if (it == worker_map_.end()) {
    // 未找到主机
    return nullptr;
  }
  if (it->second.empty()) {
    // 未找到主机
    return nullptr;
  }
  auto size = it->second.size();
  // TODO 轮询
  auto index = util::get_random_uint32(0, static_cast<std::uint32_t>(size - 1));
  return it->second.at(index)->do_command_co(cmd_vec, timeout);
}

kratos::redis::RedisWorker::RedisWorker(service::ServiceBox *box) {
  box_ = box;
}

kratos::redis::RedisWorker::~RedisWorker() { stop(); }

auto kratos::redis::RedisWorker::start(const std::string &host, int port,
                                       const std::string &user,
                                       const std::string &passwd) -> bool {

  running_ = true;
  ctx_ = connect_redis(host, port, user, passwd);
  if (!ctx_) {
    if (box_) {
      std::stringstream error;
      error << "host[" << host << ":" << port << "], user[" << user
            << "], passwd[" << passwd << "]";
      box_->write_log(lang::LangID::LANG_REDIS_CONNECT_FAIL,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
    return false;
  }
  worker_ = std::thread([&]() -> void {
    ResultImpl *result = nullptr;
    while (running_) {
      result = nullptr;
      auto retval = worker_queue_.try_read(result);
      if (retval) {
        struct timeval tv {
          static_cast<long>(result->get_timeout() / std::time_t(1000)),
              static_cast<long>((result->get_timeout() % std::time_t(1000)) *
                                std::time_t(1000))
        };
        redisSetTimeout(ctx_, tv);
        if (result->get_command_count() > 1) {
          if (!do_redis_command_batch(result->get_command_array(), result,
                                      true)) {
            std::string all_cmd;
            for (const auto &cmd : result->get_command_array()) {
              all_cmd += cmd + "\n";
            }
            // 命令执行失败
            if (box_) {
              box_->write_log(lang::LangID::LANG_REDIS_EXEC_FAIL,
                              klogger::Logger::FAILURE, all_cmd.c_str());
            }
          }
        } else {
          auto *reply = do_redis_command(result->get_command(), result, true);
          if (!reply && box_) {
            // 命令执行失败
            if (box_) {
              box_->write_log(lang::LangID::LANG_REDIS_EXEC_FAIL,
                              klogger::Logger::FAILURE,
                              result->get_command().c_str());
            }
          }
        }
        // 通知主线程
        worker_queue_.send(result);
      } else {
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
      }
    }
    // 执行完所有任务
    while (worker_queue_.try_read(result)) {
      struct timeval tv {
        static_cast<long>(result->get_timeout() / std::time_t(1000)),
            static_cast<long>((result->get_timeout() % std::time_t(1000)) *
                              std::time_t(1000))
      };
      redisSetTimeout(ctx_, tv);
      if (result->get_command_count() > 1) {
        do_redis_command_batch(result->get_command_array(), result, true);
      } else {
        do_redis_command(result->get_command(), result, true);
      }
      worker_queue_.send(result);
    }
  });
  return true;
}

auto kratos::redis::RedisWorker::stop() -> bool {
  running_ = false;
  if (worker_.joinable()) {
    worker_.join();
  }
  // 执行完所有队列内存储
  update(util::get_os_time_millionsecond());
  if (ctx_) {
    redisFree(ctx_);
    ctx_ = nullptr;
  }
  return true;
}

auto kratos::redis::RedisWorker::do_command(const std::string &command,
                                            std::time_t timeout,
                                            RedisHandler handler,
                                            std::uint64_t user_data) -> bool {
  // 非协程调用从池内分配
  auto *result = kratos::allocate<ResultImpl>();
  result->set_command(command);
  result->set_timeout(timeout);
  result->set_handler(handler);
  result->set_user_data(user_data);
  return main_queue_.send(result);
}

auto kratos::redis::RedisWorker::do_command_co(const std::string &command,
                                               std::time_t timeout)
    -> std::unique_ptr<Result> {
  // 暴漏给用户服务
  std::unique_ptr<ResultImpl> result(new ResultImpl());
  result->set_command(command);
  result->set_timeout(timeout);
  result->set_coid(coro_id());
  auto success = main_queue_.send(result.get());
  if (!success) {
    return nullptr;
  }
  coro_yield();
  return std::move(result);
}

auto kratos::redis::RedisWorker::do_command(const CommandVector &cmd_vec,
                                            std::time_t timeout,
                                            RedisHandler handler,
                                            std::uint64_t user_data) -> bool {
  // 非协程调用从池内分配
  auto *result = kratos::allocate<ResultImpl>();
  for (const auto &cmd : cmd_vec) {
    result->set_command(cmd);
  }
  result->set_timeout(timeout);
  result->set_handler(handler);
  result->set_user_data(user_data);
  return main_queue_.send(result);
}

auto kratos::redis::RedisWorker::do_command_co(const CommandVector &cmd_vec,
                                               std::time_t timeout)
    -> std::unique_ptr<Result> {
  // 暴漏给用户服务
  std::unique_ptr<ResultImpl> result(new ResultImpl());
  for (const auto &cmd : cmd_vec) {
    result->set_command(cmd);
  }
  result->set_timeout(timeout);
  result->set_coid(coro_id());
  auto success = main_queue_.send(result.get());
  if (!success) {
    return nullptr;
  }
  coro_yield();
  return std::move(result);
}

auto kratos::redis::RedisWorker::update(std::time_t ms) -> std::size_t {
  auto count = 0;
  ResultImpl *result = nullptr;
  while (main_queue_.try_read(result)) {
    if (result->get_coid()) {
      coro_resume(result->get_coid());
    } else {
      if (result->get_handler()) {
        try {
          // TODO 返回值处理
          result->get_handler()(*result, result->get_user_data());
        } catch (std::exception &e) {
          if (box_) {
            box_->write_log(lang::LangID::LANG_UNEXPECTED_EXCEPTION,
                            klogger::Logger::FAILURE, "redis",
                            util::demangle(typeid(e).name()).c_str(), e.what());
          }
        }
      }
      // 非协程调用，回收到池
      kratos::box_dispose(result);
    }
    count += 1;
  }
  return count;
}

auto kratos::redis::RedisWorker::connect_redis(const std::string &host,
                                               int port,
                                               const std::string &user,
                                               const std::string &passwd)
    -> redisContext * {
  return connect_redis_internal(host, port, user, passwd);
}

auto kratos::redis::RedisWorker::connect_redis_internal(
    const std::string &host, int port, const std::string &user,
    const std::string &passwd) -> redisContext * {
  user_ = user;
  passwd_ = passwd;
  host_ = host;
  port_ = port;
  struct timeval tv;
  tv = {1, 0}; // re-connect timeout
  auto ctx = redisConnectWithTimeout(host.c_str(), port, tv);
  if (ctx == nullptr) {
    return nullptr;
  } else {
    if (ctx->err) {
      if (box_ && ctx->errstr) {
        std::stringstream error;
        error << "host[" << host << ":" << port << "], user[" << user
              << "], passwd[" << passwd << "], error:" << ctx->errstr;
        box_->write_log(lang::LangID::LANG_REDIS_CONNECT_FAIL,
                        klogger::Logger::FAILURE, error.str().c_str());
      }
      redisFree(ctx);
      ctx = nullptr;
      return nullptr;
    }
  }
  if (passwd.empty()) {
    return ctx;
  }
  auto reply = do_redis_command("AUTH " + passwd_, nullptr, false);
  if (!reply || (reply->type == REDIS_REPLY_ERROR)) {
    redisFree(ctx);
    ctx = nullptr;
    if (box_) {
      std::stringstream error;
      error << "host[" << host << ":" << port << "], user[" << user
            << "], passwd[" << passwd << "], error: AUTH failure";
      box_->write_log(lang::LangID::LANG_REDIS_CONNECT_FAIL,
                      klogger::Logger::FAILURE, error.str().c_str());
    }
  }
  if (reply) {
    freeReplyObject(reply);
  }
  return ctx;
}

auto kratos::redis::RedisWorker::do_redis_command(const std::string &command,
                                                  ResultImpl *result,
                                                  bool reconnect)
    -> redisReply * {
  if (!ctx_) {
    if (reconnect) {
      ctx_ = connect_redis_internal(host_, port_, user_, passwd_);
      if (!ctx_) {
        return nullptr;
      }
    } else {
      return nullptr;
    }
  }
  auto reply =
      reinterpret_cast<redisReply *>(redisCommand(ctx_, command.c_str()));
  if (reply == nullptr) {
    switch (ctx_->err) {
    case REDIS_ERR_PROTOCOL:
    case REDIS_ERR_OTHER:
    case REDIS_ERR_OOM: {
      if (result) {
        // error
        result->set_error(ctx_->errstr);
        result->set_error_code(RedisError::FAILURE);
      }
    } break;
    case REDIS_ERR_IO:
    case REDIS_ERR_EOF: {
      if (result) {
        // error
        result->set_error(ctx_->errstr);
        result->set_error_code(RedisError::TIMEOUT);
      }
    } break;
    }
  } else {
    if (result && (reply->type == REDIS_REPLY_ERROR)) {
      // error
      result->set_error(reply->str);
      result->set_error_code(RedisError::FAILURE);
    }
  }
  if (reply) {
    // reset reply object manually
    ctx_->reader->reply = nullptr;
  }
  if (result && reply) {
    result->set_reply(reply);
  }
  if (!result && reply) {
    freeReplyObject(reply);
  }
  return reply;
}

auto kratos::redis::RedisWorker::do_redis_command_batch(
    const CommandVector &cmd_vec, ResultImpl *result, bool reconnect) -> bool {
  if (!ctx_) {
    if (reconnect) {
      ctx_ = connect_redis_internal(host_, port_, user_, passwd_);
      if (!ctx_) {
        return false;
      }
    } else {
      return false;
    }
  }
  for (const auto &cmd : cmd_vec) {
    if (REDIS_OK != redisAppendCommand(ctx_, cmd.c_str())) {
      switch (ctx_->err) {
      case REDIS_ERR_PROTOCOL:
      case REDIS_ERR_OTHER:
      case REDIS_ERR_OOM: {
        if (result) {
          // error
          result->set_error(ctx_->errstr);
          result->set_error_code(RedisError::FAILURE);
        }
      } break;
      case REDIS_ERR_IO:
      case REDIS_ERR_EOF: {
        if (result) {
          // error
          result->set_error(ctx_->errstr);
          result->set_error_code(RedisError::TIMEOUT);
        }
      } break;
      }
      return false;
    }
  }
  for (const auto &_ : cmd_vec) {
    redisReply *reply{nullptr};
    int ret_code = redisGetReply(ctx_, (void **)&reply);
    if ((ret_code == REDIS_ERR)) {
      if (result && reply && (reply->type == REDIS_REPLY_ERROR)) {
        // error
        result->set_error(reply->str);
        result->set_error_code(RedisError::FAILURE);
      }
      return false;
    }
    if (!reply) {
      switch (ctx_->err) {
      case REDIS_ERR_PROTOCOL:
      case REDIS_ERR_OTHER:
      case REDIS_ERR_OOM: {
        if (result) {
          // error
          result->set_error(ctx_->errstr);
          result->set_error_code(RedisError::FAILURE);
        }
      } break;
      case REDIS_ERR_IO:
      case REDIS_ERR_EOF: {
        if (result) {
          // error
          result->set_error(ctx_->errstr);
          result->set_error_code(RedisError::TIMEOUT);
        }
      } break;
      }
      return false;
    } else {
      if (result) {
        if (reply->str && (reply->type != REDIS_REPLY_STRING) && (reply->str != std::string("OK"))) {
          result->set_error(reply->str);
          result->set_error_code(RedisError::FAILURE);
        }
        result->set_reply(reply);
      } else {
        freeReplyObject(reply);
      }
    }
  }
  return true;
}

kratos::redis::ResultImpl::ResultImpl() {}

kratos::redis::ResultImpl::~ResultImpl() {
  for (auto *reply : reply_vec_) {
    if (reply) {
      freeReplyObject(reply);
    }
    reply_vec_.clear();
  }
}

auto kratos::redis::ResultImpl::get_command() const -> const std::string & {
  static std::string NullString;
  if (cmd_vec_.empty()) {
    return NullString;
  }
  return cmd_vec_[cur_index_];
}

auto kratos::redis::ResultImpl::get_coid() -> std::uint64_t { return coid_; }

auto kratos::redis::ResultImpl::set_coid(std::uint64_t coid) -> void {
  coid_ = coid;
}

auto kratos::redis::ResultImpl::set_error_code(RedisError code) -> void {
  error_code_ = code;
}

auto kratos::redis::ResultImpl::set_user_data(std::uint64_t user_data) -> void {
  user_data_ = user_data;
}

auto kratos::redis::ResultImpl::get_user_data() -> std::uint64_t {
  return user_data_;
}

auto kratos::redis::ResultImpl::get_reply() const -> void * {
  if (reply_vec_.empty() || (cur_index_ >= reply_vec_.size())) {
    return nullptr;
  }
  return reply_vec_[cur_index_];
}

auto kratos::redis::ResultImpl::get_command_count() const -> std::size_t {
  return cmd_vec_.size();
}

auto kratos::redis::ResultImpl::get_command_array() const
    -> const CommandVector & {
  return cmd_vec_;
}

auto kratos::redis::ResultImpl::get_command(std::size_t index) const
    -> const std::string & {
  if (cmd_vec_.empty() || (index >= cmd_vec_.size())) {
    throw std::overflow_error("index overflow");
  }
  return cmd_vec_[index];
}

auto kratos::redis::ResultImpl::get_reply(std::size_t index) const -> void * {
  if (reply_vec_.empty() || (index >= reply_vec_.size())) {
    throw std::overflow_error("index overflow");
  }
  return reply_vec_[index];
}

auto kratos::redis::ResultImpl::next_reply() const -> bool {
  if (reply_vec_.empty() || (cur_index_ + 1 >= reply_vec_.size())) {
    return false;
  }
  const_cast<ResultImpl *>(this)->cur_index_ += 1;
  return true;
}

auto kratos::redis::ResultImpl::is_success() const -> bool {
  return (error_code_ == RedisError::SUCCESS);
}

auto kratos::redis::ResultImpl::get_error() const -> const std::string & {
  return error_;
}

auto kratos::redis::ResultImpl::get_error_code() const -> RedisError {
  return error_code_;
}

auto kratos::redis::ResultImpl::get_return(std::string &s) const -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_STRING) {
    return false;
  }
  s.assign(reply->str, reply->len);
  return true;
}

auto kratos::redis::ResultImpl::get_return(std::vector<std::string> &v) const
    -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_ARRAY) {
    return false;
  }
  for (int i = 0; i < (int)reply->elements; i += 1) {
    v.emplace_back(std::string(reply->element[i]->str, reply->element[i]->len));
  }
  return true;
}

auto kratos::redis::ResultImpl::get_return(std::list<std::string> &v) const
    -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_ARRAY) {
    return false;
  }
  for (int i = 0; i < (int)reply->elements; i += 1) {
    v.emplace_back(std::string(reply->element[i]->str, reply->element[i]->len));
  }
  return true;
}

auto kratos::redis::ResultImpl::get_return(
    std::unordered_map<std::string, std::string> &m) const -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_ARRAY) {
    return false;
  }
  for (int i = 0; i < (int)reply->elements; i += 2) {
    m.emplace(
        std::string(reply->element[i]->str, reply->element[i]->len),
        std::string(reply->element[i + 1]->str, reply->element[i + 1]->len));
  }
  return true;
}

auto kratos::redis::ResultImpl::get_return(
    std::unordered_set<std::string> &s) const -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_ARRAY) {
    return false;
  }
  for (int i = 0; i < (int)reply->elements; i += 1) {
    s.emplace(std::string(reply->element[i]->str, reply->element[i]->len));
  }
  return true;
}

auto kratos::redis::ResultImpl::get_return(std::size_t &size) const -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type != REDIS_REPLY_INTEGER && reply->type != REDIS_REPLY_STRING) {
    return false;
  }
  if (reply->type == REDIS_REPLY_INTEGER) {
    size = static_cast<std::size_t>(reply->integer);
  } else {
    try {
      size = std::stoll(reply->str);
    } catch (...) {
      return false;
    }
  }
  return true;
}

auto kratos::redis::ResultImpl::get_return(bool &b) const -> bool {
  auto reply_void = get_reply();
  if (!reply_void || !is_success()) {
    return false;
  }
  auto *reply = reinterpret_cast<redisReply *>(reply_void);
  if (reply->type == REDIS_REPLY_INTEGER) {
    b = (reply->integer != 0ll);
  } else if (reply->type == REDIS_REPLY_STATUS) {
    b = (reply->str == std::string("OK"));
  } else {
    return false;
  }
  return true;
}

auto kratos::redis::ResultImpl::set_error(const char *string) -> void {
  error_ = string;
}

auto kratos::redis::ResultImpl::set_reply(void *reply) -> void {
  reply_vec_.push_back(reply);
}

auto kratos::redis::ResultImpl::get_timeout() -> std::time_t {
  return timeout_;
}

auto kratos::redis::ResultImpl::set_timeout(std::time_t ms) -> void {
  timeout_ = ms;
}

auto kratos::redis::ResultImpl::set_handler(RedisHandler handler) -> void {
  handler_ = handler;
}

auto kratos::redis::ResultImpl::get_handler() -> RedisHandler {
  return handler_;
}

auto kratos::redis::ResultImpl::set_command(const std::string &command)
    -> void {
  cmd_vec_.push_back(command);
}
